home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Libris Britannia 4
/
science library(b).zip
/
science library(b)
/
DDJMAG
/
DDJ9212.ZIP
/
hyprcube.asc
< prev
next >
Wrap
Text File
|
1992-11-30
|
66KB
|
1,838 lines
_SIMULATING HYPERCUBES IN UNIX_
by Jeffery W. Hamilton and Eileen M. Ormsby
[LISTING ONE]
/***** cube.h *****/
/* Hypercube Simulation definitions */
#define NUMBER_IN_PART 4 /* number of nodes in partition */
#define PM_PORT 6000
/* Maximum message sent between nodes */
#define MAX_MESSAGE_SIZE (1024 * 16)
typedef struct {
char *name; /* network name of the computer hosting partition */
int socket; /* file descriptor for the socket */
int errfdp; /* file descriptor for sending "kill" values */
struct sockaddr_in addr;
} subpart;
typedef struct {
int type; /* message type sent with the message -1 or greater */
int spid; /* sender's group number (pid) */
int snode; /* sender's node number */
int dnode; /* node this message is destined for */
int t_length; /* total length of message */
int length; /* length of the message */
char valid[NUMBER_IN_PART+2]; /* 0= no message */
char msg[MAX_MESSAGE_SIZE]; /* Actual message contents */
[LISTING TWO]
/***** pm.c *****/
/* PARTITION MANAGER -- This program will run on all partitions used for an
** application. It is started via a remote execution call from "load".
** The main program gets the input arguments, sets a few variables and calls
** the Partition Manager subroutine which performs the following functions:
** determines local partition information; allocates neccessary partition
** structures; sets up interrupt handling to free system resources when the
** application is terminated; sets up server portion of socket communications;
** forks a client PM that sets up client portion of the socket communications,
** waits for a node to request data to be sent to a partition, and sends data
** over the sockets; forks and execs application children; performs server PM
** functions that waits to receive data from the sockets and notifies
** appropriate nodes when data has arrived.
** The PM server only receives data for its nodes, and the PM client sends
** data to a remote partition.
** BASIC SOFTWARE ARCHITECTURE: The load module in the simcube library will:
** 1) Read the .pmrc file; 2) Determine how many partitions will be used
** for this application; 3) Fork and exec a local PM and the appropriate
** number of remote PMs PM is passed the name of the application program,
** its partition number, the key value for PM to communicate to the host
** process with the group number, the total number of application nodes,
** the names of the other partitions running this application.
** The initialization portion (init_simulator) which is called by the
** application processes (nodes) will set up interrupt handling and create
** the shared memory and semaphores necessary for communications
** between local nodes and the Partition Manager.
*/
/* These functions allow a UNIX system to simulate a hypercube environment. */
#include <sys/types.h>
#include <errno.h>
#include <stdio.h>
#include <fcntl.h>
#include <sys/ioctl.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <sys/ipc.h>
#include <sys/wait.h>
#include <signal.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <string.h>
#include <sys/time.h>
#include <sys/utsname.h>
#include <sys/param.h>
#include "cube.h"
#define NUM_TRIES 60
#define min(x,y) (((x) < (y)) ? (x) : (y))
/* Function Prototypes */
void *malloc(int size);
void *shmat(int, void*, int);
int pm (char *filename, char *pmsites[]);
void setup_server_sockets(void);
void pm_server(void);
void setup_client_sockets(void);
void pm_getmsg(void);
void abort_prog(void);
void sig_terminate(int sig, int code, struct sigcontext *scp);
void unexpected_death(int sig, int code, struct sigcontext *scp);
int _killcube(int node, int pid);
int init_shared_mem(void **pointer, int size, int key);
int init_semaphore(int *semid, int size, int value, int key);
int semcall_all(int semid, int size, int operation);
int semcall_one(int semid, int num, int operation);
int numnodes(void);
int numparts(void);
int mypart(void);
int partof(int node);
int pm_partof(int node);
int numbuffers(void);
int mybuffer(void);
int bufferof(int node);
int mynode(void);
int myhost(void);
void pm_client(void);
/* Local, Private Information */
fd_set node_part_set, temp_set;
/* node_part_set is the variable that FD_XXX commands are */
/* applied to. Definitions of fd_set structure, and FD_ZERO, */
/* FD_SET, FD_CLR, and FD_ISSET macros are in <sys/types.h> */
/* node_part_set will have socket file descriptors.*/
static int num_parts; /* number of partitions */
static int my_part; /* partition this process is in */
static int nodes_in_part; /* number of nodes in this partition */
static subpart *partition; /* list of partition information */
static int base; /* base key value for allocating shared data */
static int my_node; /* node number for this process */
static int my_group; /* group id for this process */
/* There are two groups, host communications */
/* and inter-node communications */
static int num_nodes; /* total number of nodes in all partitions */
static int msgavail = -1; /* semaphores indicating message is available */
static int msgfree = -1; /* semaphores indicating buffer is free */
static int next_message = -1; /* which message is to be received next */
static int shmid_m = -1; /* id of shared area for messages */
static message *buffer = NULL;/* communication areas */
static int *children = NULL; /* process ids of all child processes */
static int child_index = 0; /* number of children created */
static int pmserver_pid = 0; /* pid of pmserver */
/* Main: reads arguments from command line, places them in local variables
** and calls pm. (Local variables are not necessary, but enhances readability)
** NOTE: ONLY TEN NODES (PM SITES) ARE READ FROM THE COMMAND LINE */
int main(int argc, char *argv[])
{
char *filename;
char *pmsites[16];
int i;
if (argc < 7 ) {
fprintf (stderr, "PM main: error not enough arguments\n");
fflush(stderr);
exit(-1);
}
filename = argv[1];
my_part = atoi(argv[2]);
base = atoi(argv[3]);
my_group = atoi(argv[4]);
num_nodes = atoi(argv[5]);
for (i = 0; i < argc - 6; i++) {
pmsites[i] = argv[i + 6];
}
pm (filename, pmsites);
}
/* pm -- Determines partition information, sets up signal handling, sets up
** server sockets, forks client pm, forks application children. PM splits the
** application into NUMBER_IN_PART processes. The partition number is passed
** as an input parameter. The starting node number is the partition number
** NUMBER_IN_PART and remaining processes will be numbered consecutively.
** Shared memory will be allocated to serve as a communications vehicle within
** a partition. Sockets used between partitions to allow multiple UNIX systems
** to be combined to create a larger set of CPUs to be applied to a problem. */
int pm (char *filename, char *pmsites[])
{
register int i, pid;
char temp[128]; /* used to set up environment variables */
char part_names[64];
int start_node;
int dest_node;
/* Determine how many other partitions exist */
num_parts = (num_nodes + NUMBER_IN_PART - 1) / NUMBER_IN_PART;
/* Determine which node is the first for this partition */
start_node = mypart() * NUMBER_IN_PART;
/* Determine how many nodes are in this partition (1-4) */
nodes_in_part = numnodes() - (mypart() * NUMBER_IN_PART);
nodes_in_part = min(NUMBER_IN_PART, nodes_in_part);
/* Set PM's node to be the last node on this partition */
/* (The children will be start_node through start_node + nodes_in_part-1) */
my_node = nodes_in_part;
/* Create the structure to hold the partition names and socket fds */
if ((partition = malloc(num_parts * sizeof(subpart))) == NULL) {
fprintf(stderr,"PM %d SERVER: insufficient memory\n, mypart()");
fflush(stderr);
return -1;
}
memset(partition, 0, num_parts * sizeof(subpart));
/* Catch these signals so PM can notify children to clean up */
signal(SIGINT,sig_terminate);
signal(SIGTERM,sig_terminate);
signal(SIGQUIT,sig_terminate);
/* Watch for unexpected deaths */
signal(SIGCHLD, unexpected_death);
/* Create, bind, and listen on sockets */
setup_server_sockets();
if (mypart() != 0) {
/* Only change the base on partitions that are not the one that includes
** host. That partition requires same base that host session is using. */
base = getpid();
}
/* Allocate shared memory */
shmid_m = init_shared_mem(&buffer, sizeof(message) * numbuffers(), base);
if (mypart() != 0) {
memset(buffer, 0, sizeof(message) * numbuffers());
}
/* Allocate communications semaphores */
init_semaphore(&msgavail, numbuffers(), 0, base+10000);
init_semaphore(&msgfree, numbuffers(), 0, base+20000);
/* Flush stdout and stderr before doing a fork, so child doesn't inherit */
fflush(stdout);
fflush(stderr);
/* Fork PM CLIENT here */
if ((pmserver_pid = fork()) < 0) {
/* Can't create the PM CLIENT */
_killcube(0, 0);
fprintf(stderr, "PM %d SERVER: unable to create PM CLIENT process %d\n",
mypart(), i);
fflush(stderr);
return -1;
} else if (pmserver_pid == 0) {
/* Fill in the names of the other sites in the partition structure and
** close the socket file desciptors that this process just inherited. */
for (i = 0; i < num_parts; i++) {
if (mypart() != i) {
partition[i].name = pmsites[i];
close(partition[i].socket);
}
}
/* CALL CLIENT SUBROUTINES */
setup_client_sockets();
pm_client();
} else {
/* SERVER: forks application children then calls pm_server subroutine */
/* Read from pmsites array, create a comma delimited string for env */
part_names[0] = '\0';
for (i = 0; i < num_parts; i ++) {
strcat(part_names, pmsites[i]);
strcat(part_names, ",");
}
/* Allocate space for child pids */
if ((children = malloc(nodes_in_part * sizeof(int))) == NULL) {
fprintf(stderr,"PM %d SERVER: insufficient memory\n", mypart());
fflush(stderr);
return -1;
}
/* Load all nodes within this partition */
for (i = start_node; (i < start_node + nodes_in_part); i++) {
if ((pid = fork()) < 0) {
/* Can't create all the children! */
_killcube(0, 0);
fprintf(stderr, "PM %d SERVER: unable to create node process %d\n",
mypart(), i);
fflush(stderr);
return -1;
} else if (pid == 0) {
/* I'm the child process */
/* Start the node program */
my_node = i;
sprintf(temp, "SIM_INFO=%d,%d,%d,%d,%s",
base,my_node,my_group,num_nodes,part_names);
if (putenv(temp) != 0) {
fprintf(stderr,
"PM %d SERVER: Insufficient room to add env variable\n",
my_node);
fflush(stderr);
return -1;
}
execlp(filename,filename,NULL);
/* If we get here, we had a problem */
perror("execlp");
fprintf(stderr,"PM %d SERVER: error execing node=%d file=%s
errno=%d\n", mypart(), my_node, filename, errno);
fflush(stderr);
return -1;
} else {
/* I'm the parent process */
children[child_index++] = pid;
}
}
/* CALL SERVER SUBROUTINE */
pm_server();
} /* end if PM SERVER */
}
/* setup_server_sockets -- SERVER SOCKETS- for all partitions except ourself:
** Create a socket Bind the socket to a unique PORT id. (If the socket was
** in use in a prior iteration, it may not have been reset yet - therefore we
** loop a fixed number of times retrying.) Put a listen on socket. Put new
** socket file descriptor into our set */
static void setup_server_sockets(void)
{
int i, j;
struct sockaddr_in part_sock, tempaddr;
/* Zero out the set of partition sockets */
FD_ZERO(&node_part_set);
FD_ZERO(&temp_set);
for (i = 0; i < num_parts; i++) {
/* Skip ourself */
if (i == mypart () )
continue;
for (j = 0; j < NUM_TRIES; j++) {
/* Create a SERVER socket to receive data */
if ((partition[i].socket = socket(AF_INET, SOCK_STREAM, 0))
< 0) {
fprintf(stderr, "PM %d SERVER: can't open stream socket, errno\n",
mypart(), errno);
fflush(stderr);
exit (100);
}
/* Bind SERVER socket to local addr so partitions can send to it */
bzero((char*)&part_sock, sizeof(part_sock));
part_sock.sin_family = AF_INET;
part_sock.sin_addr.s_addr = htonl (INADDR_ANY);
/* Create unique SERVER socket port address, up to 16 per computer */
part_sock.sin_port = htons (PM_PORT + (mypart() << 4) + i);
/* If socket is still in use from prev iter, keep trying to bind */
if ((bind(partition[i].socket, &part_sock,
sizeof(part_sock))) < 0) {
if ((errno == EADDRINUSE) || (errno == EINTR)) {
/* Previous load hasn't shutdown yet, or we were interrupted. */
close(partition[i].socket);
sleep(2);
} else {
fprintf(stderr,"PM %d SERVER: can't bind local addr,
errno=%d\n", mypart(), errno);
fflush(stderr);
exit(100);
}
} else {
/* It worked, exit the loop */
break;
}
}
if (j == NUM_TRIES) {
/* Exceeded retry limit */
fprintf(stderr,"PM %d SERVER: can't bind local addr, errno=%d\n",
mypart(), errno);
fflush(stderr);
exit(100);
}
/* Issue a listen for the server sockets */
if (listen(partition[i].socket, 1) < 0) {
fprintf(stderr,"PM %d SERVER: can't listen on %d, errno = %d\n",
mypart(), partition[i].socket, errno);
fflush(stderr);
exit(100);
}
/* Set the bit for the socket file descriptor */
FD_SET(partition[i].socket, &node_part_set);
} /* end for setting up SERVER sockets */
}
/* pm_server -- SERVER- go into a receiving loop: Copy file desciptors to a
** temporary set. Determine how many sockets are ready to be accepted. For
** each file descriptor that is ready: Find file descriptor that is ready.
** If it is found in a partition's array of fd's then it is a base socket and
** it is "accept"ed and added to the fd set. Else it is an fd that has data to
** be received. Receive the size of the message. Loop until entire message is
** received. Clear the valid indicator bits. Inform nodes that a message has
** arrived. If a broadcast message, set everyone's valid bit, and wait until
** everyone receives it. Else verify that message belongs to a node on this
** part and set that node's valid bit, wait until it is recvd. */
static void pm_server(void)
{
int i, j;
int accept_rdy;
int newsockfd, templen;
int size, count, partial;
char *target;
struct sockaddr_in tempaddr;
/* forever, accept sockets and receive data */
for ( ; ; ) {
temp_set = node_part_set;
/* Determine how many sockets are ready to be accepted */
/* FD_SETSIZE is defined in <sys/types.h> to be 200 */
if ((accept_rdy = select( FD_SETSIZE, &temp_set, 0, 0, 0)) == -1) {
if (errno != 4) {
fprintf(stderr, "PM %d SERVER: error in select, errno = %d\n",
mypart(), errno);
perror( "pm select" ) ;
fflush(stderr);
_killcube(0,0);
exit(-1);
} else {
/* We were interrupted, try again */
continue;
}
}
for (i = 1; (accept_rdy != 0) && (i < FD_SETSIZE) ; i++) {
/* Find the file descriptor that needs servicing */
if ( FD_ISSET( i, &temp_set)) {
/* temporary modification */
/* accept_rdy--; */
accept_rdy = 0;
/* Examine each partition's array of fd's to find ready one */
for (j = 0; j < num_parts; j++) {
/* Skip examining our own partition */
if (j == mypart() )
continue;
/* Since this matches our "base" socket, accept the socket */
if (i == partition[j].socket) {
newsockfd = accept(partition[j].socket,
(struct sockaddr_in *)&tempaddr, &templen);
FD_SET (newsockfd, &node_part_set);
/* Found "base" socket, break out of for each part loop */
break;
} /* end if base socket */
} /* end for check file descriptors in partition's array */
/* If it wasn't a base socket, then need to receive data */
if (j != num_parts) {
continue;
} else /* receive the data from the socket */ {
/* First receive the size of the message */
while (recv(i, &size, sizeof(size)) < 0) {
if (errno != 22) {
fprintf(stderr, "PM %d SERVER: recv size err,
errno=%d, fd=%d\n", mypart(),errno, i);
fflush(stderr);
_killcube(0,0);
exit(-1);
} else {
fprintf(stderr, "PM %d SERVER: recv size err, errno=%d,
fd=%d\n", mypart(), errno, i);
fflush(stderr);
}
} /* end while recv msg */
target = (char *) &buffer[nodes_in_part];
count = 0;
/* Now receive the message, it could come in pieces */
while (count < size) {
if ((partial = recv(i, target, size - count)) < 0) {
fprintf(stderr, "PM %d SERVER: Error recvng msg;
errno=%d\n", mypart(),errno);
fflush(stderr);
exit(-1);
}
count += partial;
target += partial;
}
/* Make sure all valid bits are cleared */
memset(buffer[nodes_in_part].valid,0,
sizeof(buffer[nodes_in_part].valid));
/* Tell the node(s) the message is there */
if (buffer[nodes_in_part].dnode == -1) {
/* Broadcast the message to nodes in this partition */
for (j=0; j < nodes_in_part; j++) {
buffer[nodes_in_part].valid[j] = 1;
}
semcall_all(msgavail,nodes_in_part, 1);
/* Wait until everyone receives the message */
semcall_one(msgfree, nodes_in_part, -nodes_in_part);
} else {
if (mypart() != partof(buffer[nodes_in_part].dnode))
{
fprintf(stderr, "PM %d SERVER: Recvd msg for node %d
not this partition\n",
mypart(), buffer[nodes_in_part].dnode);
fflush(stderr);
} else {
/* Point to point to another node in same partition */
j = bufferof(buffer[nodes_in_part].dnode);
buffer[nodes_in_part].valid[j] = 1;
semcall_one(msgavail, j, 1);
/* Wait until it is received */
semcall_one(msgfree, nodes_in_part, -1);
}
} /* endif broadcast message */
} /* endif receiving data from this socket */
} /* endif this socket */
} /* endfor */
} /* end forever receive messages on sockets */
}
/* setup_client_sockets -- Setting up CLIENT sockets- for all partitions
** except ourself: Create a socket to send data. Look up address of host,
** place in the sockaddr_in structure. Determine appropriate PORT id (needs to
** match with SERVER). (If socket was in use in a prior iteration, it may not
** have been reset yet - therefore we loop a fixed number of times retrying.).
** Issue a connect for the socket */
static void setup_client_sockets(void)
{
int i, j;
struct hostent *hent;
/* Establish socket communications with other partitions */
for (i = 0; i < num_parts; i++) {
/* Skip ourself */
if (i == mypart () )
continue;
for (j = 0; j < NUM_TRIES; j++) {
/* Create a CLIENT socket to send data */
partition[i].socket = socket(AF_INET, SOCK_STREAM, 0);
/* Lookup host address and place in the socket address structure */
memset(&partition[i].addr, 0, sizeof(struct sockaddr_in));
partition[i].addr.sin_family = AF_INET;
if ((hent = gethostbyname(partition[i].name))
== NULL) {
fprintf(stderr,"PM %d CLIENT: No entry for %d in /etc/hosts\n",
mypart(), partition[i].name);
fflush(stderr);
exit(100);
}
memcpy(&partition[i].addr.sin_addr, hent->h_addr,
hent->h_length);
partition[i].addr.sin_port = htons(PM_PORT + (i << 4) +
mypart());
/* Connect to the socket */
if (connect(partition[i].socket, &partition[i].addr,
sizeof(struct sockaddr_in)) < 0) {
if (errno == ECONNREFUSED) {
/* unsuccessful connect, sleep and try again */
sleep(3);
} else {
/* another error occurred, quit trying to connect */
j = NUM_TRIES;
break;
}
} else {
/* successful connect, break out of loop */
break;
} /* endif connect */
} /* endfor NUM_TRIES */
if (j == NUM_TRIES) {
fprintf(stderr,
"PM %d CLIENT: Unable to connect sock to %s, errno %d\n",
mypart(), partition[i].name, errno);
fflush(stderr);
exit(100);
}
} /* end for setting up CLIENT sockets */
}
/* pm_client -- The PM CLIENT process sends data to partitions. Set up client
** sockets. Send messages over the sockets: Get message. Send message (if it
** is a broadcast message send it to all partitions, if not send it to
** appropriate partition). Acknowledge sending of message. Release buffer.
** Reset next message indicator. */
static void pm_client(void)
{
int i, size;
/* CLIENT- GO INTO INFINITE SENDING LOOP */
/* Initial setting to indicate the next message has not been selected */
next_message = -1;
/* Forever, wait for messages to send over socket */
for ( ; ; ) {
/* Get the message */
pm_getmsg();
/* Determine where to send the message */
if (buffer[next_message].dnode == -1) {
/* BROADCAST MESSAGE, SEND TO ALL PARTITIONS */
for (i = 0; i < numparts(); i++) {
/* Don't send broadcast to self */
if (i == mypart () )
continue;
/* First send the size of the message */
size = buffer[next_message].length;
if (send(partition[i].socket, &size, sizeof(size),0)
< 0) {
fprintf(stderr,
"PM %d CLIENT: send to PM %d failed, errno=%d\n",
mypart(), i, errno);
fflush(stderr);
return -1;
}
/* Then send the actual message */
if (send(partition[i].socket,
&buffer[next_message], size, 0) < 0) {
fprintf(stderr,
"PM %d CLIENT: send to PM %d failed, errno=%d\n",
mypart(), i, errno);
fflush(stderr);
return -1;
}
} /* endfor SEND BROADCAST TO ALL PARTITIONS */
} else {
/* SEND TO A SPECIFIC PARTITION */
/* First send the size of the message */
size = buffer[next_message].length;
i = partof(buffer[next_message].dnode);
if (send(partition[i].socket, &size, sizeof(size),0)
< 0) {
fprintf(stderr,
"PM %d CLIENT: send to PM %d failed, errno=%d\n",
mypart(), i, errno);
fflush(stderr);
return -1;
}
/* Then send the actual message */
if (send(partition[i].socket,
&buffer[next_message], size, 0) < 0) {
fprintf(stderr,
"PM %d CLIENT: send to PM %d failed, errno=%d\n",
mypart(), i, errno);
return -1;
}
}
/* FOR BOTH BROADCAST AND REGULAR MESSAGES */
/* acknowledge the sending of the message */
buffer[next_message].valid[mybuffer()] = 0;
/* release (free) the buffer */
semcall_one(msgfree, next_message, 1);
/* reset next_message so the next getmsg will work */
next_message = -1;
} /* end forever CLIENT PROCESS sending messages over socket */
}
/***** Initialization and Termination routines *****/
/* abort_prog -- Clean up in the case of an error */
static void abort_prog(void)
{
int i;
/* Remove the sets of semaphores */
if (pmserver_pid != 0) {
if (msgavail != -1) {
semctl(msgavail, 0, IPC_RMID, 0);
msgavail = -1;
}
if (msgfree != -1) {
semctl(msgfree, 0, IPC_RMID, 0);
msgfree = -1;
}
}
/* Remove the shared memory */
if (buffer != NULL) {
shmdt(buffer);
buffer = NULL;
}
/* Only PM SERVER process should execute this code */
if (pmserver_pid != 0) {
if (shmid_m != -1) {
shmctl(shmid_m, IPC_RMID, 0);
shmid_m = -1;
}
}
/* Close the sockets */
for (i = 0; i < num_parts; i++) {
if (i != mypart() ) {
close (partition[i].socket);
partition[i].socket = 0;
}
}
/* Make sure all pending output gets out */
fflush(stdout);
fflush(stderr);
}
/* Handle termination signals */
void sig_terminate(int sig, int code, struct sigcontext *scp)
{
int i;
/* Send termination signal to each of PM SERVER's children */
if (pmserver_pid != 0) {
for (i = 0; i < child_index; i++) {
kill(children[i], SIGTERM);
}
child_index = 0;
kill(pmserver_pid, SIGTERM);
}
/* Clean up the use of semaphores and shared memory */
abort_prog();
exit(100);
}
/* Handle unexpected termination signals */
void unexpected_death(int sig, int code, struct sigcontext *scp)
{
int statval;
int waitpid;
/* Only PM SERVER process should execute this code */
if (pmserver_pid != 0) {
waitpid = wait(&statval);
if (waitpid < 0) {
printf("Error determining who died unexpectedly. Errno=%d\n", errno);
} else {
if (WIFSIGNALED(statval) != 0) {
printf("Process %d did not catch signal %d.\n",
waitpid, WTERMSIG(statval));
} else if (WIFSTOPPED(statval) != 0) {
printf("Process %d stopped due to signal %d.\n",
waitpid, WSTOPSIG(statval));
} else if (WIFEXITED(statval) == 0) {
/* Normal termination */
} else {
/* Terminated with exit code */
}
}
}
fflush(stdout);
}
/* killcube -- On abort, kill off all children on the hypercube partition */
int _killcube(int node, int pid)
{
int i;
int statval;
int waitpid;
/* Only PM SERVER process should execute this code */
if (pmserver_pid != 0) {
for (i = 0; i < child_index; i++) {
kill(children[i], SIGTERM);
}
kill(pmserver_pid, SIGTERM);
for (i = 0; i <= child_index; i++) {
waitpid = wait(&statval);
if (waitpid < 0) {
/* No more children left */
break;
} else {
if (WIFSIGNALED(statval) != 0) {
printf("Process %d did not catch signal %d.\n",
waitpid, WTERMSIG(statval));
} else if (WIFSTOPPED(statval) != 0) {
printf("Process %d stopped due to signal %d.\n",
waitpid, WSTOPSIG(statval));
} else if (WIFEXITED(statval) == 0) {
/* Normal termination */
} else {
/* Terminated with exit code */
}
}
}
}
/* Clean up after ourself */
abort_prog();
child_index = 0;
return 0;
}
/* init_shared_mem -- Allocates a shared memory region. Sets pointer to region
** in this process's memory space and returns the shared memory identifier. */
static int init_shared_mem(void **pointer, int size, int key)
{
int shmid;
if ((shmid = shmget(key, size, 0666 | IPC_CREAT)) < 0) {
printf("init_shm: allocation of shared memory failed. Errno=%d\n",errno);
printf(" mynode=%d key=%d size=%d\n",my_node,key,size);
_killcube(0,0);
exit(-1);
}
*pointer = shmat(shmid, NULL, 0);
return shmid;
}
/* init_semaphore -- Allocates a set of semaphores and initializes them */
static int init_semaphore(int *semid, int size, int value, int key)
{
register int i;
if ((*semid = semget(key, size, 0666 | IPC_CREAT)) < 0) {
printf("init_sem: allocation of semaphores failed. Errno=%d\n",errno);
printf(" mynode=%d key=%d size=%d\n",my_node,key,size);
_killcube(0,0);
exit(-1);
}
for (i = 0; i < size; i++) {
if (semctl(*semid, i, SETVAL, value) < 0) {
printf("init_sem: init of semaphores failed. Errno=%d\n",errno);
printf(" mynode=%d offset=%d value=%d\n",my_node,i,value);
_killcube(0,0);
exit(-1);
}
}
return *semid;
}
/* semcall_all --Perform same operation on all elements of semaphore at once.*/
static int semcall_all(int semid, int size, int operation)
{
struct sembuf sbuf[NUMBER_IN_PART+1];
register int i;
for (i = 0; i < size; i++) {
sbuf[i].sem_num = i;
sbuf[i].sem_op = operation;
sbuf[i].sem_flg = 0;
}
while (semop(semid, sbuf, size) < 0) {
/* repeat operation if interrupted */
if (errno != EINTR) {
printf("PM %d: Semaphore broadcast failed. Errno = %d\n",
mypart(), errno);
fflush(stdout);
return -1;
}
}
return 0;
}
/* semcall_one -- Perform an operation on an element of a semaphore. */
static int semcall_one(int semid, int num, int operation)
{
struct sembuf sbuf;
sbuf.sem_num = num;
sbuf.sem_op = operation;
sbuf.sem_flg = 0;
while (semop(semid, &sbuf, 1) < 0) {
/* repeat operation if interrupted */
if (errno != EINTR) {
printf("PM %d: Semaphore failed. Errno = %d\n", mypart(), errno);
fflush(stdout);
return -1;
}
}
return 0;
}
/***** Environment Information (External and Internal) *****/
/* numnodes -- Returns the number of simulated nodes */
int numnodes(void)
{
return num_nodes;
}
/* numparts -- number of partitions */
static int numparts(void)
{
return num_parts;
}
/* mypart -- Partition this process is in */
static int mypart(void)
{
return my_part;
}
/* partof -- Determines which partition a given node is a member of */
static int partof(int n)
{
if (n == myhost()) {
return 0;
} else {
return n / NUMBER_IN_PART;
}
}
/* pm_partof -- Determines which subpartition a given node is a member of
** A -1 can be passed if a destination node is broadcast, return -1. */
static int pm_partof(int n)
{
if (n == myhost()) {
return 0;
} else if (n == -1) {
return -1;
} else {
return n / NUMBER_IN_PART;
}
}
/* numbuffers -- Number of buffers in this partition */
static int numbuffers(void)
{
if (mypart() == 0) {
return (nodes_in_part + 2);
} else {
return (nodes_in_part + 1);
}
}
/* mybuffer -- returns the index for this process's buffer */
static int mybuffer(void)
{
return (nodes_in_part);
}
/* bufferof -- Returns the buffer offset of the given node. Host is always
** second to last buffer in partition 0. The PM is always the last buffer */
static int bufferof(int n)
{
if (mypart() != partof(n)) {
return nodes_in_part; /* Return the buffer of PM */
} else if (n == myhost()) {
return nodes_in_part + 1; /* This partition, buffer of host */
} else {
return n % NUMBER_IN_PART; /* This partition, buffer of node */
}
}
/* mynode -- Returns the node number for this process */
int mynode(void)
{
return my_node;
}
/* myhost -- Returns the node number of the host */
int myhost(void)
{
return numnodes();
}
/***** Communications *****/
/* pm_getmsg -- Wait until a message is available. This routine differs from
** getmsg, in that it checks to ensure that destination node is not in this
** partition. (Getmsg checks that current node equals destination node.)
** OUTPUT: next_message - set to the message found of the proper type */
static void pm_getmsg(void)
{
int i;
/* Only wait if a message is not already selected */
if (next_message != -1) return;
/* Wait for a message for me */
semcall_one(msgavail, mybuffer(), -1);
/* Search for those messages that are for me */
for (i = 0; i < numbuffers(); i++) {
if (buffer[i].valid[mybuffer()] != 0) {
next_message = i;
return;
}
}
}
[LISTING THREE]
/***** simulate.c *****/
/* These functions allow a UNIX system simulate a hypercube environment. */
#include <stdio.h>
#include <ctype.h>
#include <sys/types.h>
#include <errno.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <sys/signal.h>
#include <sys/wait.h>
#include <sys/socket.h>
#include <sys/in.h>
#include <netdb.h>
#include "cube.h"
/* Prototypes */
char *getenv(char *variable);
void *shmat(int shmid, void *shmaddr, int shmflg);
char *strtok(char *, char *);
char *strcpy(char *, char *);
void *malloc(int size);
#define min(x,y) (((x) < (y)) ? (x) : (y))
int csend(int type, void *msg, int length, int target_node, int group);
int crecv(int type, void *buf, int len);
int killcube(int, int);
int numnodes(void);
int myhost(void);
int mynode(void);
int numparts(void);
int numbuffers(void);
int mybuffer(void);
int bufferof(int node);
int mypart(void);
int partof(int node);
/* Local, Private Information */
static int num_parts; /* number of partitions */
static int my_part; /* partition this process is in */
static int nodes_in_part; /* number of nodes in this partition */
static subpart *partition = NULL; /* list of partition information */
static int base; /* base key value for allocating shared data */
static int my_node; /* node number for this process */
static int my_group; /* group id for this process */
/* There are two groups, host communications */
/* and inter-node communications */
static int num_nodes; /* total number of nodes in all partitions */
static int msgavail = -1; /* semaphores indicating message is available */
static int msgfree = -1; /* semaphores indicating buffer is free */
static int next_message; /* which message is to be received next */
static int shmid_m = -1; /* id of shared area for messages */
static message *buffer = NULL;/* communication areas */
static int *children = NULL; /* process ids of all child processes */
static int child_index = 0; /* number of children created */
/ ** Initialization and Termination routines ** /
/* abort_prog -- Clean up when the program terminates */
void abort_prog(void)
{
/* Remove the sets of semaphores */
if (mynode() == myhost()) {
if (msgavail != -1) {
semctl(msgavail, 0, IPC_RMID, 0);
msgavail = -1;
}
if (msgfree != -1) {
semctl(msgfree, 0, IPC_RMID, 0);
msgfree = -1;
}
}
/* Remove the shared memory */
if (buffer != NULL) {
shmdt(buffer);
buffer = NULL;
}
if (mynode() == myhost()) {
if (shmid_m != -1) {
shmctl(shmid_m, IPC_RMID, 0);
shmid_m = -1;
}
}
/* Make sure all pending output gets out */
fflush(stdout);
fflush(stderr);
}
/* Handle termination signals */
void sig_terminate(int sig, int code, struct sigcontext *scp)
{
if (mynode() == myhost()) {
/* Pass on the termination signal to the node processes */
killcube(0,0);
} else {
/* This is executed by the node processes */
/* Clean up the use of semaphores and shared memory */
abort_prog();
}
exit(100);
}
/* Handle unexpected termination signals. Used by the host process. */
void unexpected_death(int sig, int code, struct sigcontext *scp)
{
int statval;
int waitpid;
waitpid = wait(&statval);
if (waitpid < 0) {
printf("Error determining who died unexpectedly. Errno=%d\n", errno);
} else {
if (WIFSIGNALED(statval) != 0) {
printf("Process %d did not catch signal %d.\n",
waitpid, WTERMSIG(statval));
} else if (WIFSTOPPED(statval) != 0) {
printf("Process %d stopped due to signal %d.\n",
waitpid, WSTOPSIG(statval));
} else if (WIFEXITED(statval) == 0) {
/* Normal termination */
} else {
/* Terminated with exit code */
}
}
fflush(stdout);
}
/* handler -- handles hypercube specific errors that do not map to UNIX. */
void handler(int type, void (*proc)())
{
/* ignore this */
}
/* getcube -- Called by host process to gain possession of a partition in a
** hypercube. Note: Assuming getcube is only called once per host process. */
void getcube(char *cubename, char *cubetype, char *srmname, int keep,
char *account)
{
char size[8];
int is_dimension = 0;
int i;
char *ptr;
char *target;
/* Pull out the requested number of nodes */
ptr = cubetype;
if (*ptr == 'd') {
ptr++;
is_dimension = 1;
}
target = size;
i = 4;
while (isdigit(*ptr) && (i-- != 0)) {
*target++ = *ptr++;
}
*target = '\0';
/* The rest of the parameters don't matter */
/* Determine the total number of nodes */
num_nodes = NUMBER_IN_PART; /* default size */
sscanf(size,"%d",&num_nodes);
if (is_dimension) {
num_nodes = 1 << num_nodes;
}
}
/* cubeinfo -- Passes back information about the partitions on a hypercube.
** Input: global=0 current attached cube; 1. all cubes you own and allocated
** by the current host; 2. all cubes on the system from which the command was
** executed; 3. how cubes are allocated on all SRMs; 4. 1 addition parameter
** (srmname) returns info for that SRM */
int cubeinfo(struct cubetable *ct, int numslots, int global, ...)
{
/* returns the number of cubes for which information is available */
/* Ignore this for now */
return 0;
}
/* relcube -- release cube gained by the getcube call. */
void relcube(char *cubename)
{
/* Ignore this for now */
}
/* killcube -- On abort, kill off all processes in the hypercube partition */
int killcube(int node, int pid)
{
int i;
int statval;
int waitpid;
/* Force everyone to terminate */
for (i = 0; i < child_index; i++) {
kill(children[i], SIGTERM);
}
/* Give the children a chance to terminate */
if (child_index > 0) sleep(1);
/* Wait for everyone to exit, check status in case */
for (i = 0; i < child_index; i++) {
waitpid = wait(&statval);
if (waitpid < 0) {
/* No more children left */
break;
} else {
if (WIFSIGNALED(statval) != 0) {
printf("Process %d did not catch signal %d.\n",
waitpid, WTERMSIG(statval));
} else if (WIFSTOPPED(statval) != 0) {
printf("Process %d stopped due to signal %d.\n",
waitpid, WSTOPSIG(statval));
} else if (WIFEXITED(statval) == 0) {
/* Normal termination */
} else {
/* Terminated with exit code */
}
}
}
/* Clean up after ourself */
abort_prog();
child_index = 0;
return 0;
}
/* init_shared_mem -- Allocates a shared memory region. Sets pointer to region
** in this process's memory space and returns the shared memory identifier. */
static int init_shared_mem(void **pointer, int size, int key)
{
int shmid;
if ((shmid = shmget(key, size, 0666 | IPC_CREAT)) < 0) {
printf("init: allocation of shared memory failed. Errno=%d\n",errno);
printf(" mynode=%d key=%d size=%d\n",my_node,key,size);
fflush(stdout);
sig_terminate(0,0,NULL);
}
*pointer = shmat(shmid, NULL, 0);
return shmid;
}
/* init_semaphore -- Allocates a set of semaphores and initializes them */
static int init_semaphore(int *semid, int size, int value, int key)
{
register int i;
if ((*semid = semget(key, size, 0666 | IPC_CREAT)) < 0) {
printf("init: allocation of semaphores failed. Errno=%d\n",errno);
printf(" mynode=%d key=%d size=%d\n",my_node,key,size);
fflush(stdout);
sig_terminate(0,0,NULL);
}
for (i = 0; i < size; i++) {
if (semctl(*semid, i, SETVAL, value) < 0) {
printf("init: initialization of semaphores failed. Errno=%d\n",errno);
printf(" mynode=%d offset=%d value=%d\n",my_node,i,value);
fflush(stdout);
sig_terminate(0,0,NULL);
}
}
return *semid;
}
/* semcall_all -- Perform same operation on all elements of a semaphore. */
static int semcall_all(int semid, int size, int operation)
{
struct sembuf sbuf[NUMBER_IN_PART+1];
register int i;
for (i = 0; i < size; i++) {
sbuf[i].sem_num = i;
sbuf[i].sem_op = operation;
sbuf[i].sem_flg = 0;
}
while (semop(semid, sbuf, size) < 0) {
/* repeat operation if interrupted */
if (errno != EINTR) {
printf("%d: Semaphore broadcast failed. Errno = %d\n",mynode(),errno);
abort_prog();
exit(-1);
}
}
return 0;
}
/* semcall_one -- Perform an operation on an element of a semaphore. */
static int semcall_one(int semid, int num, int operation)
{
struct sembuf sbuf;
sbuf.sem_num = num;
sbuf.sem_op = operation;
sbuf.sem_flg = 0;
while (semop(semid, &sbuf, 1) < 0) {
/* repeat operation if interrupted */
if (errno != EINTR) {
printf("%d: Semaphore failed. Errno = %d\n",mynode(), errno);
abort_prog();
exit(-1);
}
}
return 0;
}
/* setpid -- Assigns a partition identifier to the simulated partition. */
int setpid(int id)
{
my_group = id;
return 0;
}
/* init_simulator -- Should be called near the beginning of an application
** before any hypercube-related functions are called. */
void init_simulator(void)
{
register int i, pid;
char filename[20];
char *temp;
static char env[256]; /* must be static */
struct hostent *hent;
/* parent cm will send child cm SIGINT when a CTRL-BREAK is pressed */
signal(SIGINT,sig_terminate);
signal(SIGTERM,sig_terminate);
signal(SIGQUIT,sig_terminate);
/* Pick up the base key value from the environment */
if ((temp = getenv("SIM_INFO")) == NULL) {
fprintf(stderr,"init_sim: Missing environment variable\n");
fflush(stderr);
exit(-1);
}
strcpy(env,temp);
if ((temp = strtok(env,",")) == NULL) {
fprintf(stderr, "init_sim: Missing information in environment variable\n");
fflush(stderr);
exit(-1);
}
sscanf(temp,"%d",&base);
if ((temp = strtok(NULL,",")) == NULL) {
fprintf(stderr,"init_sim: Missing node info in environment variable\n");
fflush(stderr);
exit(-1);
}
sscanf(temp,"%d",&my_node);
if ((temp = strtok(NULL,",")) == NULL) {
fprintf(stderr,"init_sim: Missing pid info in environment variable\n");
fflush(stderr);
exit(-1);
}
sscanf(temp,"%d",&my_group);
if ((temp = strtok(NULL,",")) == NULL) {
fprintf(stderr,"init_sim: Missing number of node info in environment
variable\n");
fflush(stderr);
exit(-1);
}
sscanf(temp,"%d",&num_nodes);
num_parts = (num_nodes + NUMBER_IN_PART - 1) / NUMBER_IN_PART;
my_part = my_node / NUMBER_IN_PART;
/* Calcuate the number of nodes in this and remaining partitions */
i = numnodes() - (mypart() * NUMBER_IN_PART);
nodes_in_part = min(NUMBER_IN_PART, i);
/* Allocate shared memory */
shmid_m = init_shared_mem(&buffer, sizeof(message) * numbuffers(), base);
/* Allocate communications semaphores */
init_semaphore(&msgavail, numbuffers(), 0, base+10000);
init_semaphore(&msgfree, numbuffers(), 0, base+20000);
}
/* load -- Should be called near the beginning of a host application before any
** hypercube-related functions are called, except for getcube. It will start
** the appropriate number of PMs on the appropriate systems (as read from the
** .pmrc file.) Parent process will be node 0, which has special roles on a
** hypercube. Remaining processes will be numbered consecutively. */
int load(char *filename, int which_node, int group_id)
{
register int i, j, pid, size;
char *argv[20];
char base_string[20];
char partition_number[20];
char group_string[20];
char number_of_nodes[20];
char temp[256];
char *ptr;
struct servent *sp;
FILE *fd;
/* Allocate space for child pids */
if (children == NULL) {
if ((children = malloc(numnodes() * sizeof(int))) == NULL) {
fprintf(stderr,"load: insufficient memory\n");
fflush(stderr);
return -1;
}
}
/* parent will send us SIGINT when CTRL-BREAK is pressed */
signal(SIGINT,sig_terminate);
signal(SIGTERM,sig_terminate);
signal(SIGQUIT,sig_terminate);
signal(SIGCHLD, unexpected_death);
base = getpid();
num_parts = (num_nodes + NUMBER_IN_PART - 1) / NUMBER_IN_PART;
if (partition == NULL) {
if ((partition = malloc(num_parts * sizeof(subpart))) == NULL) {
fprintf(stderr,"load: insufficient memory\n");
fflush(stderr);
return -1;
}
memset(partition, 0, num_parts * sizeof(subpart));
if ((fd = fopen(".pmrc","r")) == NULL) {
fprintf(stderr,"load: Missing configuration file \".pmrc\"\n");
fflush(stderr);
return -1;
}
for (i = 0; i < num_parts; i++) {
temp[0] = '\0';
fscanf(fd," %[^ \n] \n",temp);
size = strlen(temp);
if ((ptr = malloc(size+1)) == NULL) {
fprintf(stderr,"load: Insufficent memory\n");
fflush(stderr);
return -1;
}
strcpy(ptr,temp);
partition[i].name = ptr;
}
fclose(fd);
}
/* Host program's node number is the same as the number of nodes */
my_node = numnodes();
my_part = 0;
/* Calcuate the number of nodes in this and remaining partitions */
i = numnodes() - (mypart() * NUMBER_IN_PART);
nodes_in_part = min(NUMBER_IN_PART, i);
/* Allocate shared memory */
if (shmid_m == -1) {
shmid_m = init_shared_mem(&buffer, sizeof(message) * numbuffers(),base);
}
memset(buffer,0,sizeof(message) * numbuffers());
/* Allocate communications semaphores */
if (msgavail == -1) {
init_semaphore(&msgavail, numbuffers(), 0, base+10000);
}
if (msgfree == -1) {
init_semaphore(&msgfree, numbuffers(), 0, base+20000);
}
/* Split into node processes */
fflush(stdout);
fflush(stderr);
/* Start the local and remote Partition Managers */
for (i = 0; i < num_parts; i++) {
if ((pid = fork()) < 0) {
/* Can't create all the children! */
killcube(0,0);
fprintf(stderr, "LOAD: unable to create Partition Managers\n");
return -1;
} else if (pid == 0) {
/* I'm the child process */
my_node = -1;
/* Start the Partition Managers */
if (i == 0) {
argv[0] = "pm";
argv[1] = filename;
sprintf(partition_number, "%d", i);
argv[2] = partition_number;
sprintf(base_string,"%d", base);
argv[3] = base_string;
sprintf(group_string, "%d", group_id);
argv[4] = group_string;
sprintf(number_of_nodes, "%d", numnodes());
argv[5] = number_of_nodes;
for (i = 0; i < num_parts; i++) {
argv[i+6] = partition[i].name;
}
argv[i+6] = NULL;
execvp("pm",argv);
/* If we get here, we had a problem */
printf("execvp of PM 0 failed. errno=%d\n",errno);
fflush(stdout);
exit(-1);
} else {
argv[0] = "rsh";
argv[1] = partition[i].name;
argv[2] = "pm";
argv[3] = filename;
sprintf(partition_number, "%d", i);
argv[4] = partition_number;
sprintf(base_string,"%d", base);
argv[5] = base_string;
sprintf(group_string, "%d", group_id);
argv[6] = group_string;
sprintf(number_of_nodes, "%d", numnodes());
argv[7] = number_of_nodes;
for (i = 0; i < num_parts; i++) {
argv[i+8] = partition[i].name;
}
argv[i+8] = NULL;
execvp("rsh",argv);
/* If we get here, we had a problem */
printf("execvp of PM 0 failed. errno=%d\n",errno);
fflush(stdout);
exit(-1);
}
} else {
/* I'm the parent process */
children[child_index++] = pid;
}
}
}
/** Environment Information (External and Internal) **/
/* availmem -- returns amount of memory available */
int availmem(void)
{
return 0;
}
/* nodedim -- Returns the dimension of the simulated hypercube */
int nodedim(void)
{
unsigned int i, temp;
temp = num_nodes;
i = 0;
while (temp != 0) {
temp >> 1;
i++;
}
return i;
}
/* numnodes -- Returns the number of simulated nodes */
int numnodes(void)
{
return num_nodes;
}
/* numparts -- number of simulator partitions */
static int numparts(void)
{
return num_parts;
}
/* mypart -- Simulator partition this process is in */
static int mypart(void)
{
return my_part;
}
/* partof -- Determines which simulator partition a given node is member of */
static int partof(int n)
{
if (n == myhost()) {
return 0;
} else {
return n / NUMBER_IN_PART;
}
}
/* numbuffers -- Number of buffers in this simulator partition */
static int numbuffers(void)
{
if (mypart() == 0) {
return nodes&us.in&us.part + 2;
} else {
return nodes&us.in&us.part + 1;
}
}
/* mybuffer -- returns the index for this process's buffer */
static int mybuffer(void)
{
if (mynode() == myhost()) {
return nodes&us.in&us.part+1;
} else {
return mynode() % NUMBER_IN_PART;
}
}
/* bufferof -- Returns the buffer offset of the given node. The host is always
** the last buffer in partition 0. The PM is always second to last buffer */
static int bufferof(int n)
{
if (mypart() != partof(n)) {
return nodes_in_part; /* Return the buffer of PM */
} else if (n == myhost()) {
return nodes_in_part + 1; /* This partition, buffer of host */
} else {
return n % NUMBER_IN_PART; /* This partition, buffer of node */
}
}
/* mynode -- Returns the node number for this process */
int mynode(void)
{
return my_node;
}
/* mypid -- Returns the group number */
int mypid(void)
{
return my_group;
}
/* myhost -- Returns the node number of the host */
int myhost(void)
{
return numnodes();
}
/** Communications **/
/* cread -- Special read for files on hypercube's high-speed disk system. We
just issue a standard read instead. */
int cread(int fd, void *buffer, int size)
{
return read(fd, buffer, size);
}
/* gdsum -- Sum individual elements of an array on all processes */
void gdsum(double x[], long elements, double work[])
{
register int i,j;
double temp;
if ((mybuffer()) == 0) {
/* The first node in each partition sums the local data */
if (nodes_in_part > 1) {
/* Only sum when we aren't the only ones in the partition */
for (i = 1; i < nodes_in_part; i++) {
/* Get the next set of numbers to sum */
crecv(-2, work, elements * sizeof(double));
for (j = 0; j < elements; j++) {
x[j] += work[j];
}
}
}
/* Node 0 sums for all partitions */
if (mynode() == 0) {
/* Only sum if there are more than one partition */
if (numparts() > 1) {
for (i = 1; i < numparts(); i++) {
/* Get the next set of numbers to sum */
crecv(-3, work, elements * sizeof(double));
for (j = 0; j < elements; j++) {
x[j] += work[j];
}
}
}
/* Only broadcast if there is more than one node */
if (nodes_in_part > 1) {
/* Broadcast the results */
csend(-4,x,elements * sizeof(double),-1,mypid());
}
} else {
/* Each partition needs to send the partial sum to node 0 */
csend(-3,x,elements * sizeof(double),0,mypid());
/* Wait for the answer */
crecv(-4,x,elements * sizeof(double));
}
} else {
/* Send the data to local node to do the summation */
csend(-2,x,elements * sizeof(double),mypart()*4,mypid());
/* Wait for the answer */
crecv(-4,x,elements * sizeof(double));
}
}
/* getmsg -- Wait until a message is available. OUTPUT: next_message, set to
** the message found of the proper type */
static void getmsg(void)
{
int i;
/* Only wait if a message is not already selected */
if (next_message != -1) return;
/* Wait for a message for me */
semcall_one(msgavail, mybuffer(), -1);
/* Search for those messages that are for me */
for (i = 0; i < numbuffers(); i++) {
if (buffer[i].valid[mybuffer()] == 1) {
if ((buffer[i].dnode == mynode()) || (buffer[i].dnode == -1)) {
next_message = i;
return;
}
}
}
}
/* cprobe -- Wait until a message of a specific type is available. OUTPUT:
** next_message, set to the message found of the proper type */
void cprobe(int type)
{
int i,j;
/* Make sure all pending writes in application have occured */
fflush(stdout);
fflush(stderr);
/* See if a specific type was requested */
if (type == -1) {
getmsg();
return;
} else if ((next_message != -1) && (type == buffer[next_message].type)) {
/* message was already located */
return;
} else {
while (1) {
/* Wait for a message for me */
semcall_one(msgavail, mybuffer(), -1);
/* Search for those messages that are for me and is the type I need */
for (i = 0; i < numbuffers(); i++) {
if (buffer[i].valid[mybuffer()] == 1) {
if ((buffer[i].dnode == mynode()) || (buffer[i].dnode == -1)) {
if (buffer[i].type == type) {
next_message = i;
/* Put back all skipped messages back */
for (j = 0; j < numbuffers(); j++) {
if (buffer[j].valid[mybuffer()] == 2) {
buffer[j].valid[mybuffer()] = 1;
semcall_one(msgavail, mybuffer(), 1);
}
}
return;
} else {
/* Mark the message so that we don't look at it again */
buffer[i].valid[mybuffer()] = 2;
}
}
}
}
}
}
}
/* infocount -- Return the length of the message that will be received. */
int infocount(void)
{
getmsg();
return buffer[next_message].t_length;
}
/* infonode -- Returns the node that sent the message */
int infonode(void)
{
getmsg();
return buffer[next_message].snode;
}
/* infopid -- Returns the group (pid) of the node that sent the message */
int infopid(void)
{
getmsg();
return buffer[next_message].spid;
}
/* csend -- Synchronous message sending between two nodes. If the target node
** number is -1, then the message is broadcasted to all nodes. Limitations:
** Assumes that the message buffer is free to use. In other words, if
** an asynchronous send was previously done, we assume that a msgwait
** was done to ensure the previous message reached its destination. */
int csend(int type, void *msg, int length, int target_node, int group)
{
int i,j, sent_length = 0;
char *source;
i = mybuffer();
/* Fill in the message */
source = msg;
buffer[i].type = type;
buffer[i].dnode = target_node;
buffer[i].spid = mypid();
buffer[i].snode = mynode();
buffer[i].t_length = length;
while (length > 0) {
/* Divide the message into smaller chunks */
buffer[i].length = min(MAX_MESSAGE_SIZE, length);
memcpy(buffer[i].msg, source, buffer[i].length);
source += buffer[i].length;
sent_length += buffer[i].length;
length -= buffer[i].length;
/* Tell the node(s) the message is there */
if (target_node == -1) {
/* Broadcast the message to nodes in this partition */
/* and to the process manager */
for (j=0; j < nodes&us.in&us.part + 1; j++) {
buffer[i].valid[j] = 1;
}
semcall_all(msgavail,nodes&us.in&us.part+1, 1);
/* Of course, we already have the message */
semcall_one(msgavail,i, -1);
/* Wait until everyone receives the message */
semcall_one(msgfree, i, -nodes&us.in&us.part);
} else {
/* Point to point to another node */
j = bufferof(target_node);
buffer[i].valid[j] = 1;
semcall_one(msgavail, j, 1);
/* Wait until it is received */
semcall_one(msgfree, i, -1);
}
}
return sent_length;
}
/* crecv -- Synchronous message reception between two nodes. */
int crecv(int type, void *buf, int len)
{
int recv_len = 0, copy_len = 0, temp_len, total_len;
int recv_node;
char *target;
/* Get a message of this type */
cprobe(type);
target = buf;
total_len = buffer[next_message].t_length;
recv_node = buffer[next_message].snode;
do {
if (recv_node != buffer[next_message].snode) {
/* Message is from another node, put off receiving */
buffer[next_message].valid[mybuffer()] = 3;
} else {
/* Message is from same node, add it the previous messages */
recv_len += buffer[next_message].length;
temp_len = min(len - copy_len, buffer[next_message].length);
if (temp_len > 0) {
memcpy(target, buffer[next_message].msg, temp_len);
target += temp_len;
}
copy_len += buffer[next_message].length;
/* Acknowledge the receipt of the message */
buffer[next_message].valid[mybuffer()] = 0;
semcall_one(msgfree, next_message, 1);
}
/* Indicate that no message has been selected */
next_message = -1;
if (recv_len < total_len) {
cprobe(type);
}
} while (recv_len < total_len);
/* Scan buffers to restore any skipped messages */
for (i = 0; i < numbuffers(); i++) {
if (buffer[i].valid[mybuffer()] == 3) {
buffer[i].valid[mybuffer()] = 1;
semcall_one(msgavail, mybuffer(), 1);
}
}
return total_len;
}
/* isend -- Asynchronous message sending between two nodes. If the target node
** number is -1, then the message is broadcasted to all nodes. Limitations:
** Assumes that the message buffer is free to use. In other words, if
** an asynchronous send was previously done, we assume that a msgwait
** was done to ensure the previous message reached its destination. */
int isend(int type, void *msg, int length, int target_node, int group)
{
int i,j, sent_length = 0;
char *source;
i = mybuffer();
buffer[i].type = type;
buffer[i].dnode = target_node;
buffer[i].spid = mypid();
buffer[i].snode = mynode();
buffer[i].t_length = length;
while (length > 0) {
/* Divide the message into smaller chunks */
buffer[i].length = min(MAX_MESSAGE_SIZE, length);
memcpy(buffer[i].msg, source, buffer[i].length);
source += buffer[i].length;
sent_length += buffer[i].length;
length -= buffer[i].length;
/* Tell the node(s) the message is there */
if (target_node == -1) {
/* Broadcast the message to nodes in this partition */
/* and to the process manager */
for (j=0; j < nodes_in_part+1; j++) {
buffer[i].valid[j] = 1;
}
semcall_all(msgavail,nodes_in_part+1, 1);
/* Of course, we already have the message */
semcall_one(msgavail,i, -1);
/* Wait for acknowledge on all but the last part */
if (length > 0) {
/* Wait until everyone receives the message */
semcall_one(msgfree, i, -nodes_in_part);
}
} else {
/* Point to point to another node */
j = bufferof(target_node);
buffer[i].valid[j] = 1;
semcall_one(msgavail, j, 1);
/* Wait for acknowledge on all but the last part */
if (length > 0) {
/* Wait until it is received */
semcall_one(msgfree, i, -1);
}
}
}
/* Return which buffer needs to be waited on */
return i;
}
/* irecv -- Asynchronous message reception between two nodes. Returns message
** identifier for acknowledging the message. */
int irecv(int type, void *buf, int len)
{
int mid;
int recv_len = 0, copy_len = 0, temp_len, total_len;
char *target;
/* Get a message of this type */
cprobe(type);
mid = next_message;
target = buf;
total_len = buffer[next_message].t_length;
do {
recv_len += buffer[next_message].length;
temp_len = min(len - copy_len, buffer[next_message].length);
if (temp_len > 0) {
memcpy(target, buffer[next_message].msg, temp_len);
target += temp_len;
}
copy_len += buffer[next_message].length;
/* Acknowledge all but last partial message */
if (recv_len < total_len) {
/* Acknowledge the receipt of the message */
buffer[next_message].valid[mybuffer()] = 0;
semcall_one(msgfree, next_message, 1);
}
/* Indicate that no message has been selected */
next_message = -1;
if (recv_len < total_len) {
cprobe(type);
}
} while (recv_len < total_len);
return mid;
}
/* msgwait -- Wait for a message to be received by the target node(s) */
void msgwait(int mid)
{
if (mid == mybuffer()) {
/* Then it was a send to another node */
if (buffer[mid].dnode == -1) {
/* Wait for everyone to receive the message */
semcall_all(msgfree, mid, -nodes&us.in&us.part);
} else {
semcall_one(msgfree, mid, -1);
}
} else {
/* It was a receive from another node */
semcall_one(msgfree, mid, 1);
}
}
/* flushmsg -- Forces the removal of pending messages to a node */
void flushmsg(int type, int target_node, int group)
{
/* Do nothing for now */
fflush(stdout);
fflush(stderr);
}
/* mclock -- Return time in milliseconds. */
unsigned long mclock(void)
{
unsigned long current_time;
time(¤t_time);
return current_time * 1000;
}